In [1]:
import tensorflow as tf
from tensorflow.python.framework import dtypes
from tensorflow.contrib.learn.python.learn.datasets import base
import gzip
import os
import numpy
from six.moves import xrange
from sklearn.preprocessing import OneHotEncoder
import matplotlib.pyplot as plt
import datetime
%matplotlib inline
In [2]:
def _read32(bytestream):
dt = numpy.dtype(numpy.uint32).newbyteorder('>')
return numpy.frombuffer(bytestream.read(4), dtype=dt)[0]
def extract_images(f):
"""Extract the images into a 4D uint8 numpy array [index, y, x, depth].
Args:
f: A file object that can be passed into a gzip reader.
Returns:
data: A 4D uint8 numpy array [index, y, x, depth].
Raises:
ValueError: If the bytestream does not start with 2051.
"""
print('Extracting', f.name)
with gzip.GzipFile(fileobj=f) as bytestream:
magic = _read32(bytestream)
if magic != 2051:
raise ValueError('Invalid magic number %d in MNIST image file: %s' %
(magic, f.name))
num_images = _read32(bytestream)
rows = _read32(bytestream)
cols = _read32(bytestream)
buf = bytestream.read(rows * cols * num_images)
data = numpy.frombuffer(buf, dtype=numpy.uint8)
data = data.reshape(num_images, rows, cols, 1)
return data
def extract_labels(f, one_hot=False, num_classes=10):
"""Extract the labels into a 1D uint8 numpy array [index].
Args:
f: A file object that can be passed into a gzip reader.
one_hot: Does one hot encoding for the result.
num_classes: Number of classes for the one hot encoding.
Returns:
labels: a 1D uint8 numpy array.
Raises:
ValueError: If the bystream doesn't start with 2049.
"""
print('Extracting', f.name)
with gzip.GzipFile(fileobj=f) as bytestream:
magic = _read32(bytestream)
if magic != 2049:
raise ValueError('Invalid magic number %d in MNIST label file: %s' %
(magic, f.name))
num_items = _read32(bytestream)
buf = bytestream.read(num_items)
labels = numpy.frombuffer(buf, dtype=numpy.uint8)
if one_hot:
return dense_to_one_hot(labels, num_classes)
return labels
class DataSet(object):
def __init__(self,
images,
labels,
fake_data=False,
one_hot=False,
dtype=dtypes.float32,
reshape=True):
"""Construct a DataSet.
one_hot arg is used only if fake_data is true. `dtype` can be either
`uint8` to leave the input as `[0, 255]`, or `float32` to rescale into
`[0, 1]`.
"""
dtype = dtypes.as_dtype(dtype).base_dtype
if dtype not in (dtypes.uint8, dtypes.float32):
raise TypeError('Invalid image dtype %r, expected uint8 or float32' %
dtype)
if fake_data:
self._num_examples = 10000
self.one_hot = one_hot
else:
assert images.shape[0] == labels.shape[0], (
'images.shape: %s labels.shape: %s' % (images.shape, labels.shape))
self._num_examples = images.shape[0]
# Convert shape from [num examples, rows, columns, depth]
# to [num examples, rows*columns] (assuming depth == 1)
if reshape:
assert images.shape[3] == 1
images = images.reshape(images.shape[0],
images.shape[1] * images.shape[2])
if dtype == dtypes.float32:
# Convert from [0, 255] -> [0.0, 1.0].
images = images.astype(numpy.float32)
images = numpy.multiply(images, 1.0 / 255.0)
self._images = images
self._labels = labels
self._epochs_completed = 0
self._index_in_epoch = 0
@property
def images(self):
return self._images
@property
def labels(self):
return self._labels
@property
def num_examples(self):
return self._num_examples
@property
def epochs_completed(self):
return self._epochs_completed
def next_batch(self, batch_size, fake_data=False, shuffle=True):
"""Return the next `batch_size` examples from this data set."""
if fake_data:
fake_image = [1] * 784
if self.one_hot:
fake_label = [1] + [0] * 9
else:
fake_label = 0
return [fake_image for _ in xrange(batch_size)], [fake_label for _ in xrange(batch_size)]
start = self._index_in_epoch
# Shuffle for the first epoch
if self._epochs_completed == 0 and start == 0 and shuffle:
perm0 = numpy.arange(self._num_examples)
numpy.random.shuffle(perm0)
self._images = self.images[perm0]
self._labels = self.labels[perm0]
# Go to the next epoch
if start + batch_size > self._num_examples:
# Finished epoch
self._epochs_completed += 1
# Get the rest examples in this epoch
rest_num_examples = self._num_examples - start
images_rest_part = self._images[start:self._num_examples]
labels_rest_part = self._labels[start:self._num_examples]
# Shuffle the data
if shuffle:
perm = numpy.arange(self._num_examples)
numpy.random.shuffle(perm)
self._images = self.images[perm]
self._labels = self.labels[perm]
# Start next epoch
start = 0
self._index_in_epoch = batch_size - rest_num_examples
end = self._index_in_epoch
images_new_part = self._images[start:end]
labels_new_part = self._labels[start:end]
return numpy.concatenate((images_rest_part, images_new_part), axis=0) , numpy.concatenate((labels_rest_part, labels_new_part), axis=0)
else:
self._index_in_epoch += batch_size
end = self._index_in_epoch
return self._images[start:end], self._labels[start:end]
def read_data_sets(train_dir,
fake_data=False,
one_hot=False,
dtype=dtypes.float32,
reshape=True,
validation_size=5000):
if fake_data:
def fake():
return DataSet([], [], fake_data=True, one_hot=one_hot, dtype=dtype)
train = fake()
validation = fake()
test = fake()
return base.Datasets(train=train, validation=validation, test=test)
TRAIN_IMAGES = 'train-images-idx3-ubyte.gz'
TRAIN_LABELS = 'train-labels-idx1-ubyte.gz'
TEST_IMAGES = 't10k-images-idx3-ubyte.gz'
TEST_LABELS = 't10k-labels-idx1-ubyte.gz'
local_file = os.path.join(train_dir, TRAIN_IMAGES)
# base.maybe_download(TRAIN_IMAGES, train_dir,
# SOURCE_URL + TRAIN_IMAGES)
with open(local_file, 'rb') as f:
train_images = extract_images(f)
local_file = os.path.join(train_dir, TRAIN_LABELS)
# local_file = base.maybe_download(TRAIN_LABELS, train_dir,
# SOURCE_URL + TRAIN_LABELS)
with open(local_file, 'rb') as f:
train_labels = extract_labels(f, one_hot=one_hot)
local_file = os.path.join(train_dir, TEST_IMAGES)
# local_file = base.maybe_download(TEST_IMAGES, train_dir,
# SOURCE_URL + TEST_IMAGES)
with open(local_file, 'rb') as f:
test_images = extract_images(f)
local_file = os.path.join(train_dir, TEST_LABELS)
# local_file = base.maybe_download(TEST_LABELS, train_dir,
# SOURCE_URL + TEST_LABELS)
with open(local_file, 'rb') as f:
test_labels = extract_labels(f, one_hot=one_hot)
if not 0 <= validation_size <= len(train_images):
raise ValueError(
'Validation size should be between 0 and {}. Received: {}.'
.format(len(train_images), validation_size))
validation_images = train_images[:validation_size]
validation_labels = train_labels[:validation_size]
train_images = train_images[validation_size:]
train_labels = train_labels[validation_size:]
train = DataSet(train_images, train_labels, dtype=dtype, reshape=reshape)
validation = DataSet(validation_images,
validation_labels,
dtype=dtype,
reshape=reshape)
test = DataSet(test_images, test_labels, dtype=dtype, reshape=reshape)
return base.Datasets(train=train, validation=validation, test=test)
In [3]:
MNIST = read_data_sets("../../noMNIST/data")
In [20]:
img, lbl = MNIST.train.next_batch(batch_size)
In [11]:
sample = img[1].reshape([28,28])
plt.imshow(sample, cmap=plt.get_cmap('gray'))
Out[11]:
In [4]:
# Create one hot encoder for labels
enc = OneHotEncoder()
enc.fit([[0], [1], [2], [3], [4], [5], [6], [7], [8], [9]])
Out[4]:
In [5]:
learning_rate = 0.01
batch_size = 1000
n_epochs = 5
In [6]:
# Placeholders for input and output parameters
X = tf.placeholder(tf.float32, [None, 784], 'X')
Y = tf.placeholder(tf.float32, [None, 10], 'Y')
In [7]:
# Define weights and biases for linear regresion
W = tf.Variable(tf.random_normal(shape=[784, 10], stddev=0.01), name="weights")
Wb = tf.Variable(tf.random_normal(shape=[784, 10], stddev=0.01), name="weights_2")
Wc = tf.Variable(tf.random_normal(shape=[784, 10], stddev=0.01), name="weights_3")
b = tf.Variable(tf.zeros([1, 10]), name="bias")
In [8]:
# y = X^2*Wb + X*W + b
#y = tf.matmul(tf.multiply(X, X), Wb) + tf.matmul(X, W) + b
# y = X^3*Wc + X^2*Wb + X*W + b
y = tf.matmul(tf.multiply(tf.multiply(X, X), X), Wb) + tf.matmul(X, W) + b
In [9]:
y_pred = tf.nn.softmax(y)
entropy = tf.nn.softmax_cross_entropy_with_logits(labels=Y, logits=y)
loss = tf.reduce_mean(entropy) # Mean for all samples in minibatch
tf.summary.scalar('loss', loss)
Out[9]:
In [10]:
#preds = tf.nn.softmax(logits_batch)
correct_preds = tf.equal(tf.argmax(Y, 1), tf.argmax(y_pred, 1))
accuracy = tf.reduce_sum(tf.cast(correct_preds, tf.float32))
tf.summary.scalar('accuracy', accuracy)
Out[10]:
In [11]:
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(loss)
#optimizer = tf.train.GradientDescentOptimizer(learning_rate=learning_rate).minimize(loss)
In [12]:
init = tf.global_variables_initializer()
Implement sample shuffling? Less samples in batch leads to faster loss minimalization There is no need of big number of epochs for small sample batches - loss is almost flat after two epochs There is less fluctuation in loss for larger batch size Adam optimizer get better results for larger batch sizes
In [13]:
with tf.Session() as sess:
sess.run(init)
n_batches = int(MNIST.train.num_examples/batch_size)
total_correct = 0
#losses = []
x_test, y_test = MNIST.test.images, MNIST.test.labels
Y_test_batch = []
for arr in y_test:
Y_test_batch.append(list(enc.transform(arr).toarray()[0]))
merged = tf.summary.merge_all()
# writer = tf.summary.FileWriter('./logs', sess.graph)
run_var = datetime.datetime.now()
writer = tf.summary.FileWriter('%s/%s' % ('./logs', run_var), sess.graph)
for i in range(n_epochs):
for batch_id in range(n_batches):
X_batch, Y_batch_labels = MNIST.train.next_batch(batch_size)
Y_batch = []
for arr in Y_batch_labels:
Y_batch.append(list(enc.transform(arr).toarray()[0]))
_, summary = sess.run([optimizer, merged], feed_dict={X: X_batch, Y: Y_batch})
#_, summary, actual_loss = sess.run([optimizer, merged, loss], feed_dict={X: X_batch, Y: Y_batch})
writer.add_summary(summary)
# get results on test data
m, acc = sess.run([merged, accuracy], feed_dict={X: x_test, Y: Y_test_batch})
acc_pct = 100*acc/len(MNIST.test.labels)
print('Accuracy for epoch ', i+1, ': ', acc_pct, '%')
writer.close()
#print('Epoch number ', i+1, ' has Loss:', loss_batch, ' Acuracy: ', total_correct)
In [ ]: